一、概述
Kafka是最初由Linkedin公司开发,是⼀个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式⽇志系统(也可以当做MQ系统),能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中。常⻅可以⽤于web/nginx⽇志、访问⽇志,消息服务等等,Linkedin于2010年贡献给了Apache基⾦会并成为顶级开源项⽬。常⽤于⽇志处理场景。
kafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,它的作用是注册中心
和之前操作其他MQ产品相似,kakfa也是基于主题操作,操作之前需要先初始化topic。
http://kafka.apache.org/downloads
1.1 特性
(1)高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个主题可以
分多个分区, 消费组对分区进行消费操作;
(2)可扩展性:kafka集群支持热扩展;
(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
(4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
(5)高并发:支持数千个客户端同时读写;
1.2 应用
(1)日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
(2)消息系统:解耦和生产者和消费者、缓存消息等;
(3)用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
(4)运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
(5)流式处理:比如spark streaming和storm;
1.3 相关概念
Producer**
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
Topic
在Kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为topic。如果把Kafka看做为一个数据库,topic可以理解为数据库中的一张表,topic的名字即为表名。
Partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
Partition offffset
每条消息都有一个当前Partition下唯一的64字节的offffset,它指明了这条消息的起始位置。
Replicas of partition
副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的partition中消费数据,而是从为leader的partition中读取数据。副本之间是一主多从的关系。
Broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。如果某topic有 N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
Leader
Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
Zookeeper
Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
AR(Assigned Replicas)
分区中所有的副本统称为AR。
ISR(In-Sync Replicas)
所有与Leader部分保持一定程度的副(包括Leader副本在内)本组成ISR。
OSR(Out-of-Sync-Replicas)
与Leader副本同步滞后过多的副本。
HW(High Watermark)
高水位,标识了一个特定的offffset,消费者只能拉取到这个offffset之前的消息。
LEO(Log End Offffset)
即日志末端位移(log end offffset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。
二、使用示例
2.1 与springboot整合
依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>配置
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order发送消息
@Service
public class MessageServiceKafkaImpl implements MessageService {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(kafka),id:"+id);
kafkaTemplate.send("itheima2022",id);
}
/**
* 发送消息 带事务
*/
@GetMapping("/send/{input}")
public String sendToKafka(@PathVariable String input) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<Object, Object>> send = this.template.send(topic, input);
//return "send success";
// 带返回值的
//SendResult<Object, Object> result = send.get();
//return result.toString();
// 事务操作
// 当输入参数为“error”值时,进行了回滚操作。
template.executeInTransaction(t -> {
t.send(topic, input);
if ("error".equals(input)) {
throw new RuntimeException("input is error");
}
t.send(topic, input + " anthor");
return true;
});
return "send success";
}
@GetMapping("/sendt/{input}")
@Transactional(rollbackFor = RuntimeException.class) // 事务回滚,可代码,可注解
public String sendToKafka2(@PathVariable String input) throws ExecutionException, InterruptedException {
template.send(topic, input);
if ("error".equals(input)) {
throw new RuntimeException("input is error");
}
template.send(topic, input + " anthor");
return "send success";
}
}监听消息
@Component
public class MessageListener {
@KafkaListener(topics = "itheima2022")
public void onMessage(ConsumerRecord<String,String> record){
System.out.println("已完成短信发送业务(kafka),id:"+record.value());
}
}2.2 结合logback接收输出日志
定义kafka appender
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
public class KafkaAppender extends AppenderBase<ILoggingEvent> {
//定义属性,可以从logback.xml配置⽂件中获取
private String topic,brokers;
private KafkaTemplate kafkaTemplate;
@Override
public void start() {
Map<String, Object> props = new HashMap<>();
//连接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
//重试,0为不启⽤重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 1);
//控制批处理⼤⼩,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//批量发送,延迟为1毫秒,启⽤该功能能有效减少⽣产者发送消息次数,从⽽提⾼并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//⽣产者可以使⽤的最⼤内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
//键的序列化⽅式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class);
//值的序列化⽅式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
this.kafkaTemplate = new KafkaTemplate<Integer, String>(new DefaultKafkaProducerFactory<>(props));
super.start();
}
@Override
public void stop() {
super.stop();
}
@Override
protected void append(ILoggingEvent iLoggingEvent) {
kafkaTemplate.send(topic,iLoggingEvent.getMessage());
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getBrokers() {
return brokers;
}
public void setBrokers(String brokers) {
this.brokers = brokers;
}
}日志配置
<appender name="kafka" class="com.itheima.logdemo.utils.KafkaAppender">
<topic>demo</topic>
<brokers>39.98.133.153:9103</brokers>
</appender>2.3 使用Demo
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Kafka 消息生产者
*/
public class ProducerFastStart {
// Kafka集群地址
private static final String brokerList = "localhost:9092";
// 主题名称-之前已经创建
private static final String topic = "heima-par";
public static void main(String[] args) {
Properties properties = new Properties();
// 设置key序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//另外一种写法
//properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
// 设置值序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置集群地址
properties.put("bootstrap.servers", brokerList);
// KafkaProducer 线程安全
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
try {
producer.send(record);
//RecordMetadata recordMetadata = producer.send(record).get();
//System.out.println("part:" + recordMetadata.partition() + ";topic:" + recordMetadata.topic());
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka 消息消费者
*/
public class ConsumerFastStart {
// Kafka集群地址
private static final String brokerList = "127.0.0.1:9092";
// 主题名称-之前已经创建
private static final String topic = "heima-par";
// 消费组
private static
final String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}服务端参数配置
config/server.properties
itcast@Server-node:/mnt/d/kafka_2.12-2.2.1$ egrep 'zookeeper|listeners|broker.id|log.dir|log.dirs' config/server.properties
# broker的唯一标识符,如果不配置则自动生成,建议配置且一定要保证集群中必须唯一,默认-1
<NolebasePageProperties />
broker.id=0
# 监听列表,broker对外提供服务时绑定的IP和端口。多个以逗号隔开,如果监听器名称不是一个安全的协议, listener.security.protocol.map也必须设置。主机名称设置0.0.0.0绑定所有的接口,主机名称为空则绑定默认的接口。
listeners=PLAINTEXT://:9092
# it uses the value for "listeners" if configured. Otherwise, it will use the value
#advertised.listeners=PLAINTEXT://your.host.name:9092
#log.dirs=/tmp/kafka-logs
# 日志数据存放的目录,如果没有配置则使用log.dir,建议此项配置。
log.dirs=/tmp/kafka/log
# Zookeeper connection string (see zookeeper docs for details).
# 指明Zookeeper主机地址,如果zookeeper是集群则以逗号隔开
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
# 服务器接受单个消息的最大大小,默认1000012 约等于976.6KB。
message.max.bytes三、生产者介绍
3.1 数据生产流程
①、首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic、分区Partition、键 Key以及值 Value,主题和值是必须要声明的,分区和键可以不用指定。
②、调用send() 方法进行消息发送。
③、因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的 key 和value对象序列化成字节数组。后,生产者就知道该往哪个主题和分区发送记录了。
⑤、接着这条记录会被添加到一个记录批次里面,这个批次里所有的消息会被发送到相同的主题和分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。
③、Broker成功接收到消息,表示发送成功,返回消息的元数据(包括主题和分区信息以及记录在分区里的偏移量)。发送失败,可以选择重试或者直接抛出异常。
3.2 发送配置
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* Kafka消息生产者进一步分析
*/
public class KafkaProducerAnalysis {
private static final String brokerList = "localhost:9092";
private static final String topic = "heima";
public static Properties initConfig() {
Properties props = new Properties();
// 该属性指定 brokers 的地址清单,格式为 host:port。清单里不需要包含所有的 broker 地址,
// 生产者会从给定的 broker 里查找到其它 broker 的信息。——建议至少提供两个 broker 的信息,因为一旦其中一个宕机,生产者仍然能够连接到集群上。
props.put("bootstrap.servers", brokerList);
// 将 key 转换为字节数组的配置,必须设定为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,
// 生产者会用这个类把键对象序列化为字节数组。
// ——kafka 默认提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。当然也可以自定义序列化器。
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 和 key.serializer 一样,用于 value 的序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 用来设定KafkaProducer对应的客户端ID,默认为空,如果不设置KafkaProducer会自动生成一个非空字符串。
// 内容形式如:"producer-1"
props.put("client.id", "producer.client.id.demo");
return props;
}
public static Properties initNewConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
// 自定义分区器的使用
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName());
// 自定义拦截器使用
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());
props.put(ProducerConfig.ACKS_CONFIG,0);
return props;
}
public static Properties initPerferConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return props;
}
public static void main(String[] args) throws InterruptedException {
Properties props = initNewConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// KafkaProducer<String, String> producer = new KafkaProducer<>(props,
// new StringSerializer(), new StringSerializer());
//生成 ProducerRecord 对象,并制定 Topic,key 以及 value
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
try {
// 1、发送消息
producer.send(record);
// 2、同步发送
//通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待kafka响应
//如果kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量
// 如果kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理
//producer.send(record).get();
// 3、异步发送
// producer.send(record, new Callback() {
// @Override
// public void onCompletion(RecordMetadata metadata, Exception exception) {
// if (exception == null) {
// System.out.println(metadata.partition() + ":" + metadata.offset());
// }
// }
// });
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
// TimeUnit.SECONDS.sleep(5);
}
}3.3 序列化器
消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。Kafka 提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)基本上能够满足大部分场景的需求。
自定义序列化器
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
/**
* 自定义序列化器
*/
public class CompanySerializer implements Serializer<Company> {
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}
byte[] name, address;
try {
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
if (data.getAddress() != null) {
address = data.getAddress().getBytes("UTF-8");
} else {
address = new byte[0];
}
ByteBuffer buffer = ByteBuffer.
allocate(4 + 4 + name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);
return buffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public void close() {
}
}使用自定义序列化器
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerDefineSerializer {
public static final String brokerList = "localhost:9092";
public static final String topic = "heima";
public static void main(String[] args)
throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
CompanySerializer.class.getName());
// properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
// ProtostuffSerializer.class.getName());
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, Company> producer =
new KafkaProducer<>(properties);
Company company = Company.builder().name("kafka")
.address("北京").build();
// Company company = Company.builder().name("hiddenkafka")
// .address("China").telphone("13000000000").build();
ProducerRecord<String, Company> record =
new ProducerRecord<>(topic, company);
producer.send(record).get();
}
}3.4 分区器
本身kafka有自己的分区策略的,如果未指定,就会使用默认的分区策略:
Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。如果Key相同的话,那么就会分配到统一分区。
自定义分区器
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义分区器
*/
public class DefinePartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes) {
return counter.getAndIncrement() % numPartitions;
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}自定义分区器使用
// 自定义分区器的使用 实现自定义分区器需要通过配置参数ProducerConfig.PARTITIONER_CLASS_CONFIG来实现
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getNam e());3.5 拦截器
Producer拦截器(interceptor)和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
生产者拦截器可以用在消息发送前做一些准备工作。
使用场景
1、按照某个规则过滤掉不符合要求的消息
2、修改消息的内容
3、统计类需求
自定义拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 自定义拦截器
*/
public class ProducerInterceptorPrefix implements
ProducerInterceptor<String, String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(
ProducerRecord<String, String> record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<>(record.topic(),
record.partition(), record.timestamp(),
record.key(), modifiedValue, record.headers());
// if (record.value().length() < 5) {
// throw new RuntimeException();
// }
// return record;
}
@Override
public void onAcknowledgement(
RecordMetadata recordMetadata,
Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure++;
}
}
@Override
public void close() {
double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率="
+ String.format("%f", successRatio * 100) + "%");
}
@Override
public void configure(Map<String, ?> map) {
}
}自定义拦截器使用
// 自定义拦截器使用 后需要在配置参数中指定这个拦截器,此参数的默认值为空
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerDefineSerializer.cla ss.getName());3.6 发送原理
消息发送的过程中,涉及到两个线程协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去,需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象
3.7 其他生产者参数
acks
指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息时写入成功。涉及到消息的可靠性和吞吐量之间的权衡。
- ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
- ack=1,默认值为1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。
- ack=-1, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。
注意:acks参数配置的是一个字符串类型,而不是整数类型
retries
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到了 retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。
batch.size
当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁发送消息而增加一些额外的开销
max.request.size
该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。 broker 对可接收的消息最大值也有自己的限制( message.max.size ),所以两边的配置最好匹配,避免生产者发送的消息被 broker 拒绝
四、消费者介绍
4.1 消费者和消费组
Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息
Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。
4.2 消息接收配置
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Kafka 消费者分析
*/
@Slf4j
public class KafkaConsumerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "heima";
public static final String groupId = "group.heima";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {
Properties props = new Properties();
// 与KafkaProducer中设置保持一致 反序列化配置
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 必填参数,该参数和KafkaProducer中的相同,制定连接Kafka集群所需的broker地址清单,可以设置一个或者多个
props.put("bootstrap.servers", brokerList);
// 消费者隶属于的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
props.put("group.id", groupId);
// 指定KafkaConsumer对应的客户端ID,默认为空,如果不设置KafkaConsumer会自动生成一个非空字符串
props.put("client.id", "consumer.client.id.demo");
// 指定消费者拦截器
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class.getName());
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Arrays.asList(topic));
// 正则订阅主题
//consumer.subscribe(Pattern.compile("heima*"));
// 指定订阅的分区
//consumer.assign(Arrays.asList(new TopicPartition("heima", 0)));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
System.out.println("key = " + record.key() + ", value = " + record.value());
//do something to process record.
}
}
} catch (Exception e) {
log.error("occur exception ", e);
} finally {
consumer.close();
}
}
}4.3 反序列化
// 与KafkaProducer中设置保持一致 反序列化配置
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");4.4 位移提交
对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。
当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。
重复消费和消息丢失都是由offset不一致造成的
自动提交
让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。
需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
同步提交
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 位移提交
*/
public class CheckOffsetAndCommit {
public static final String brokerList = "localhost:9092";
public static final String topic = "heima";
public static final String groupId = "group.heima";
private static AtomicBoolean running = new AtomicBoolean(true);
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 手动提交开启
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(tp));
long lastConsumedOffset = -1;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
if (records.isEmpty()) {
break;
}
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync();//同步提交消费位移
}
System.out.println("comsumed offset is " + lastConsumedOffset);
OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
System.out.println("commited offset is " + offsetAndMetadata.offset());
long posititon = consumer.position(tp);
System.out.println("the offset of the next record is " + posititon);
}
}异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
import com.heima.kafka.ConsumerClientConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 异步提交
*/
@Slf4j
public class OffsetCommitAsyncCallback extends ConsumerClientConfig {
private static AtomicBoolean running = new AtomicBoolean(true);
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
// 异步回调
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception == null) {
System.out.println(offsets);
} else {
log.error("fail to commit offsets {}", offsets, exception);
}
}
});
}
} finally {
consumer.close();
}
try {
while (running.get()) {
consumer.commitAsync();
}
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}指定位移消费
seek()方法可以追踪以前的消费或者回溯消费。
import com.heima.kafka.ConsumerClientConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Set;
/**
* 指定位移消费
*/
public class SeekDemo extends ConsumerClientConfig {
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
// timeout参数设置多少合适?太短会使分区分配失败,太长又有可能造成一些不必要的等待
consumer.poll(Duration.ofMillis(2000));
// 获取消费者所分配到的分区
Set<TopicPartition> assignment = consumer.assignment();
System.out.println(assignment);
for (TopicPartition tp : assignment) {
// 参数partition表示分区,offset表示指定从分区的哪个位置开始消费
consumer.seek(tp, 10);
}
// consumer.seek(new TopicPartition(topic,0),10);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//consume the record.
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}
}
}增加判断是否分配到了分区:
import com.heima.kafka.ConsumerClientConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
/**
* 指定位移消费
*/
public class SeekDemoAssignment extends ConsumerClientConfig {
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
long start = System.currentTimeMillis();
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(assignment);
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//consume the record.
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}
}
}指定从分区末尾开始消费
import com.heima.kafka.ConsumerClientConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
/**
* 指定位移开始消费 末尾
*/
public class SeekToEnd extends ConsumerClientConfig {
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
// 指定从分区末尾开始消费
Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment) {
// 正常消费
//consumer.seek(tp, offsets.get(tp));
// 位移越界操作
consumer.seek(tp, offsets.get(tp) + 1);
}
System.out.println(assignment);
System.out.println(offsets);
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
//consume the record.
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}
}
}4.5 再均衡监听器
再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取消息的。
import com.heima.kafka.ConsumerClientConfig;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 再均衡监听器
*/
public class CommitSyncInRebalance extends ConsumerClientConfig {
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 劲量避免重复消费
consumer.commitSync(currentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//do nothing.
}
});
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
// 异步提交消费位移,在发生再均衡动作之前可以通过再均衡监听器的onPartitionsRevoked回调执行commitSync方法同步提交位移。
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
consumer.commitAsync(currentOffsets, null);
}
} finally {
consumer.close();
}
}
}4.6 消费者拦截器
消费者拦截器主要是在消费到消息或者在提交消费位移时进行的一些定制化的操作。
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 消费者拦截器
*/
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10 * 1000;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("before:" + records);
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords
= new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
if (!newTpRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) ->
System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}拦截器配置使用
// 指定消费者拦截器
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class .getName());4.7 消费者参数配置
fetch.min.bytes
指定了消费者读取的最小数据量。参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。
fetch.max.wait.ms
参数指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。
max.partition.fetch.bytes
参数指定了每个分区返回的最多字节数,默认为1M。也就是说,KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。
max.poll.records
参数控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量
五、主题
5.1 主题管理
# 创建主题
# localhost:2181 zookeeper所在的ip,zookeeper 必传参数,多个zookeeper用 ‘,’分开。
# partitions 用于设置主题分区数,每个线程处理一个分区数据
# replication-factor 用于设置主题副本数,每个副本分布在不通节点,不能超过总结点数。如你只有一个节点,但是创建时指定副本数为2,就会报错。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1
# 查看topic元数据信细的方法
/mnt/d/zookeeper-3.4.14$ bin/zkCli.sh -server localhost:2181
# 查看主题
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --list -- zookeeper localhost:2181
# 查看某个特定主题信息,不指定topic则查询所有 通过 --describe
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --describe -- zookeeper localhost:2181 --topic heima
# 修改主题
# 增加配置
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --alter -- zookeeper localhost:2181 --topic heima --config flush.messages=1
# 删除主题
# 若 delete.topic.enable=true 直接彻底删除该 Topic。
# 若 delete.topic.enable=false 如果当前Topic 没有使用过即没有传输过信息:可以彻底删除。 如果当前 Topic 有使用过即有过传输过信息:并没有真正删除 Topic 只是把这个 Topic 标记为删除(marked for deletion),重启 Kafka Server 后删除。
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --delete -- zookeeper localhost:2181 --topic heima
# 标记为 marked for deletion
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --list -- zookeeper localhost:21815.2 增加分区
# 增加分区数
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --alter -- zookeeper localhost:2181 --topic heima --partitions 3
# 修改分区数时,仅能增加分区个数。若是用其减少 partition 个数,则会报错5.3 分区副本分配
5.4 KafkaAdminClient应用
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* KafkaAdminClient应用
*/
public class KafkaAdminConfigOperation {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// describeTopicConfig();
// alterTopicConfig();
addTopicPartitions();
}
//Config(entries=[ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.0-IV1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1000012, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
public static void describeTopicConfig() throws ExecutionException,
InterruptedException {
String brokerList = "localhost:9092";
String topic = "heima";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient client = AdminClient.create(props);
ConfigResource resource =
new ConfigResource(ConfigResource.Type.TOPIC, topic);
DescribeConfigsResult result =
client.describeConfigs(Collections.singleton(resource));
Config config = result.all().get().get(resource);
System.out.println(config);
client.close();
}
public static void alterTopicConfig() throws ExecutionException, InterruptedException {
String brokerList = "localhost:9092";
String topic = "heima";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient client = AdminClient.create(props);
ConfigResource resource =
new ConfigResource(ConfigResource.Type.TOPIC, topic);
ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
Config config = new Config(Collections.singleton(entry));
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(resource, config);
AlterConfigsResult result = client.alterConfigs(configs);
result.all().get();
client.close();
}
public static void addTopicPartitions() throws ExecutionException, InterruptedException {
String brokerList = "localhost:9092";
String topic = "heima";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient client = AdminClient.create(props);
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put(topic, newPartitions);
CreatePartitionsResult result = client.createPartitions(newPartitionsMap);
result.all().get();
client.close();
}
}六、分区
Kafka可以将主题划分为多个分区(Partition),会根据分区规则选择把消息存储到哪个分区中,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力。
6.1 副本机制
由于Producer和Consumer都只会与Leader角色的分区副本相连,所以kafka需要以集群的组织形式提供主题下的消息高可用。kafka支持主备复制,所以消息具备高可用和持久性
一个分区可以有多个副本,这些副本保存在不同的broker上。每个分区的副本中都会有一个作为Leader。当一个broker失败时,Leader在这台broker上的分区都会变得不可用,kafka会自动移除Leader,再其他副本中选一个作为新的Leader。
在通常情况下,增加分区可以提供kafka集群的吞吐量。然而,也应该意识到集群的总分区数或是单台服务器上的分区数过多,会增加不可用及延迟的风险
6.2 分区Leader选举
如果某个分区的Leader挂了,那么其它跟随者将会进行选举产生一个新的leader,之后所有的读写就会转移到这个新的Leader上,在kafka中,其不是采用常见的多数选举的方式进行副本的Leader选举,而是会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,显然还有一些副本没有来得及同步。只有这个ISR列表里面的才有资格成为leader(先使用ISR里面的第一个,如果不行依次类推,因为ISR里面的是同步副本,消息是最完整且各个节点都是一样的)。 通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有f+1个副本,kafka可以容忍f个不可用,当然,如果全部ISR里面的副本都不可用,也可以选择其他可用的副本,只是存在数据的不一致。
6.3 分区重新分配
往已经部署好的Kafka集群里面添加机器是最正常不过的需求,而且添加起来非常地方便,需要做的事是从已经部署好的Kafka节点中复制相应的配置文件,然后把里面的broker id修改成全局唯一的,最后启动这个节点即可将它加入到现有Kafka集群中。
新添加的Kafka节点并不会自动地分配数据,所以无法分担集群的负载,除非我们新建一个topic。但是现在我们想手动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的工具来重新分布某个topic的分区。
# 创建一个有三个节点的集群
/mnt/d/kafka-cluster/kafka-1$ bin/kafka-topics.sh --create -- zookeeper localhost:2181 --topic heima-par --partitions 3 --replication-factor 3
# 详情查看
/mnt/d/kafka-cluster/kafka-1$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima-par
# 主题heima-par再添加一个分区
itcast@Server-node:/mnt/d/kafka-cluster/kafka-1$ bin/kafka-topics.sh --alter -- zookeeper localhost:2181 --topic heima-pa r --partitions 4
# 查看详情
/mnt/d/kafka-cluster/kafka-1$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima-par
# 再添加一个broker节点
/mnt/d/kafka-cluster/kafka-1$ bin/kafka-topics.sh --describe -zookeeper localhost:2181 --topic heima-par
# 重新分配
# 需要将原先分布在broker 1-3节点上的分区重新分布到broker 1-4节点上,借助kafkareassign-partitions.sh工具生成reassign plan
# 求定义一个文件,里面说明哪些topic需要重新分区
/mnt/d/kafka-cluster/kafka-1$ cat reassign.json {"topics":[{"topic":"heima-par"}], "version":1 }
# 后使用 kafka-reassign-partitions.sh 工具生成reassign plan
# --generate 表示指定类型参数
# --topics-to-move-json-file 指定分区重分配对应的主题清单路径
/mnt/d/kafka-cluster/kafka-1$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to -move-json-file reassign.json --broker-list "0,1,2,3" --generate
# 命令输入两个Json字符串,第一个JSON内容为当前的分区副本分配情况,第二个为重新分配的候选方案,注意这里只是生成一份可行性的方案,并没有真正执行重分配的动作。
# 将第二个JSON内容保存到名为result.json文件里面(文件名不重要,文件格式也不一定要以json为结尾,只要保证内容是json即可),然后执行这些reassign plan:
{ "version": 1, "partitions": [ { "topic": "heima-par", "partition": 0, "replicas": [ 1,2,3 ],"log_dirs": [ "any", "any", "any" ] },{ "topic": "heima-par", "partition": 2, "replicas": [ 3,0,1 ],"log_dirs": [ "any", "any", "any" ] },{ "topic": "heima-par", "partition": 1, "replicas": [ 2,3,0 ],"log_dirs": [ "any", "any", "any" ] },{ "topic": "heima-par", "partition": 3, "replicas": [ 0,1,2 ],"log_dirs": [ "any", "any", "any" ] } ] }
# 执行分配策略
/mnt/d/kafka-cluster/kafka-1$ bin/kafka-reassign- partitions.sh --zookeeper localhost:2181 --reassignm ent-json-file result.json --execute
# 查看分区重新分配的进度
/mnt/d/kafka-cluster/kafka-1$ bin/kafka-reassign- partitions.sh --zookeeper localhost:2181 --reassignment-json-file result.json -- verify6.4 修改副本因子
实际项目中我们可能在创建topic时没有设置好正确的replication-factor,导致kafka集群虽然是高可用的,但是该topic在有broker宕机时,可能发生无法使用的情况。topic一旦使用又不能轻易删除重建,因此动态增加副本因子就成为最终的选择。
说明:kafka 1.0版本配置文件默认没有default.replication.factor=x, 因此如果创建topic时,不指定– replication-factor 想, 默认副本因子为1. 我们可以在自己的server.properties中配置上常用的副本因子,省去手动调整。例如设置default.replication.factor=3
// 配置topic的副本
{ "version":1, "partitions":[ {"topic":"heima","partition":0,"replicas":[0,1,2]}, {"topic":"heima","partition":1,"replicas":[0,1,2]}, {"topic":"heima","partition":2,"replicas":[0,1,2]} ] }执行脚本
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-reassign-partitions.sh -- zookeeper localhost:2181 --reassignment-json-file replication-factor.json -- execute验证
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --describe -- zookeeper localhost:2181 --topic topic07036.5 分区分配策略
按照Kafka默认的消费逻辑设定,一个分区只能被同一个消费组(ConsumerGroup)内的一个消费者消费,如果消费者过多,出现了消费者的数量大于分区的数量的情况,就会有消费者分配不到任何分区。
Kafka提供了消费者客户端参数partition.assignment.strategy用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka中还提供了另外两种分配策略: RoundRobinAssignor和StickyAssignor。消费者客户端参数partition.asssignment.strategy可以配置多个分配策略,彼此之间以逗号分隔。
RangeAssignor分配策略
RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个topic,RangeAssignor策略会将消费组内所有订阅这个topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。
RoundRobinAssignor分配策略
RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。RoundRobinAssignor策略对应的partition.assignment.strategy参数值为:org.apache.kafka.clients.consumer.RoundRobinAssignor。
如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个topic,那么在分配分区的时候此消费者将分配不到这个topic的任何分区。
StickyAssignor分配策略
Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
分区的分配要尽可能的均匀; 分区的分配尽可能的与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。
假设消费组内有3个消费者:C0、C1和C2,它们都订阅了4个主题:t0、t1、t2、t3,并且每个主题有2个分区,也就是说整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区。最终的分配结果如下:
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1假设此时消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下:
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为:
消费者C0:t0p0、t1p1、t3p0、t2p0 消费者C2:t1p0、t2p1、t0p1、t3p1以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负 担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。
自定义分配策略
需实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor
继承自:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
七、Kafka存储
7.1 存储结构概述
每一个partion(文件夹)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件里。但每一个段segment file消息数量不一定相等,这样的特性方便old segment file高速被删除。(默认情况下每一个文件大小为1G)
每一个partiton仅仅须要支持顺序读写即可了。segment文件生命周期由服务端配置參数决定。
partiton中segment文件存储结构
segment file组成:由2大部分组成。分别为index file和data file,此2个文件一一相应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
segment文件命名规则:partion全局的第一个segment从0开始,兴许每一个segment文件名称为上一个segment文件最后一条消息的offset值。
数值最大为64位long大小。19位数字字符长度,没有数字用0填充。
7.2 日志索引
数据文件的分段
Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offffset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offffset命名。这样在查找指定offffset的Message的时候,用二分查找就可以定位到该Message在哪个段中。
偏移量索引
数据文件分段使得可以在一个较小的数据文件中查找对应offffset的Message了,但是这依然需要顺序扫描才能找到对应offffset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
比如:要查找绝对offffset为7的Message:
首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。 打开这个Segment的index文件,也是用二分查找找到offffset小于或者等于指定offffset的索引条目中最大的那个offffset。自然offffset为6的那个索引是我们要找的,通过索引文件我们知道offffset为6的Message在数据文件中的位置为9807。
打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offffset为7的那条Message。
这套机制是建立在offffset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。
一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。
7.3 日志清理
日志删除
Kafka日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间删除),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。 Kafka消费日志删除思想:Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用
# 启用删除策略 直接删除,删除后的消息不可恢复。可配置以下两个策略:
log.cleanup.policy=delete
# 清理超过指定时间清理:
log.retention.hours=16
# 超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824日志压缩
将数据压缩,只保留每个key最后一个版本的数据。首先在broker的配置中设置log.cleaner.enable=true启用cleaner,这个默认是关闭的。在Topic的配置中设置log.cleanup.policy=compact启用压缩策略。
压缩后的offffset可能是不连续的,比如上图中没有5和7,因为这些offffset的消息被merge了,当从这些offffset消费消息时,将会拿到比这个offffset大的offffset对应的消息,比如,当试图获取offffset为5的消息时,实际上会拿到offffset为6的消息,并从这个位置开始消费。
这种策略只适合特俗场景,比如消息的key是用户ID,消息体是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
压缩策略支持删除,当某个Key的最新版本的消息没有内容时,这个Key将被删除,这也符合以上逻辑。
7.4 磁盘存储优势
Kafka在设计的时候,采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且不允许修改已经写入的消息,这种方式属于典型的顺序写入此判断的操作,所以就算是Kafka使用磁盘作为存储介质,所能实现的额吞吐量也非常可观。
Kafka中大量使用页缓存,这页是Kafka实现高吞吐的重要因素之一。
除了消息顺序追加,页缓存等技术,Kafka还使用了零拷贝技术来进一步提升性能。“零拷贝技术”只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的订阅者时,都可以使用同一个页面缓存),避免了重复复制操作。如果有10个消费者,传统方式下,数据复制次数为4*10=40次,而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到页面缓存,10次表示10个消费者各自读取一次页面缓存。
八、高级应用
8.1 幂等性
所谓幂等性,就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,二使用Kafka的幂等性功能就可以避免这种情况。
幂等性是有条件的:
- 只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
- 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。
Producer 使用幂等性的示例非常简单,与正常情况下 Producer 使用相比变化不大,只需要把Producer 的配置 enable.idempotence 设置为 true 即可,如下所示:
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all");// 当 enable.idempotence 为 true,这里默认为 all
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");8.2 事务
幂等性并不能跨多个分区运作,而事务可以弥补这个缺憾,事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功部分失败的可能。为了实现事务,应用程序必须提供唯一的transactionalId,这个参数通过客户端程序来进行设定。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* Kafka Producer事务的使用
*/
public class ProducerTransactionSend {
public static final String topic = "heima";
public static final String brokerList = "localhost:9092";
public static final String transactionId = "transactionId";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions();
producer.beginTransaction();
try {
//处理业务逻辑并创建ProducerRecord
ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
producer.send(record1);
//模拟事务回滚案例
System.out.println(1/0);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
producer.send(record3);
//处理一些其它逻辑
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.abortTransaction();
}
producer.close();
}
}开启配置
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);前期准备
事务要求生产者开启幂等性特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要ProducerConfifig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认值为true),如果显示设置为false,则会抛出异常。
KafkaProducer提供了5个与事务相关的方法
//初始化事务,前提是配置了
transactionalId public void initTransactions()
//开启事务
public void beginTransaction()
//为消费者提供事务内的位移提交操作
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
//提交事务
public void commitTransaction()
//终止事务,类似于回滚
public void abortTransaction()8.3 控制器
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
ZooInspector管理
使用zookeeper图形化的客户端工具(ZooInspector)提供的jar来进行管理,启动如下:
1、定位到jar所在目录
2、运行jar文件 java -jar zookeeper-dev-ZooInspector.jar
3、连接Zookeeper
其中version在目前版本中固定为1,brokerid表示称为控制器的broker的id编号,timestamp表示竞选称为控制器时的时间戳。
在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。
Zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。
controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,没选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。
具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:
1、监听partition相关的变化。
2、监听topic相关的变化。
3、监听broker相关的变化
8.4 可靠性保证
可靠性保证:确保系统在各种不同的环境下能够发生一致的行为
Kafka的保证
- 保证分区消息的顺序
- 如果使用同一个生产者往同一个分区写入消息,而且消息B在消息A之后写入
- 那么Kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取消息A再读取消息B
- 只有当消息被写入分区的所有同步副本时(文件系统缓存),它才被认为是已提交
- 生产者可以选择接收不同类型的确认,控制参数 acks
- 只要还有一个副本是活跃的,那么已提交的消息就不会丢失
- 消费者只能读取已经提交的消息
- 保证分区消息的顺序
失效副本
怎么样判定一个分区是否有副本是处于同步失效状态的呢?从Kafka 0.9.x版本开始通过唯一的一个参数replica.lag.time.max.ms(默认大小为10,000)来控制,当ISR中的一个follower副本滞后leader副本的时间超过参数replica.lag.time.max.ms指定的值时即判定为副本失效,需要将此follower副本剔出除ISR之外。具体实现原理很简单,当follower副本将leader副本的LEO(Log End Offffset,每个分区最后一条消息的位置)之前的日志全部同步时,则认为该follower副本已经追赶上leader副本,此时更新该副本的lastCaughtUpTimeMs标识。Kafka的副本管理器(ReplicaManager)启动时会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的lastCaughtUpTimeMs差值是否大于参数replica.lag.time.max.ms指定的值。千万不要错误的认为follower副本只要拉取leader副本的数据就会更新lastCaughtUpTimeMs,试想当leader副本的消息流入速度大于follower副本的拉取速度时,follower副本一直不断的拉取leader副本的消息也不能与leader副本同步,如果还将此follower副本置于ISR中,那么当leader副本失效,而选取此follower副本为新的leader副本,那么就会有严重的消息丢失
副本复制
Kafka 中的每个主题分区都被复制了 n 次,其中的 n 是主题的复制因子(replication factor)。这允许Kafka 在集群服务器发生故障时自动切换到这些副本,以便在出现故障时消息仍然可用。Kafka 的复制是以分区为粒度的,分区的预写日志被复制到 n 个服务器。 在 n 个副本中,一个副本作为 leader,其他副本成为 followers。顾名思义,producer 只能往 leader 分区上写数据(读也只能从 leader 分区上进行),followers 只按顺序从 leader 上复制日志。
一个副本可以不同步Leader有如下几个原因 慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。 卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。
新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。
如何确定副本是滞后的
replica.lag.max.messages=4
在服务端现在只有一个参数需要配置replica.lag.time.max.ms。这个参数解释replicas响应partitionleader的最长等待时间。检测卡住或失败副本的探测——如果一个replica失败导致发送拉取请求时间间隔超过replica.lag.time.max.ms。Kafka会认为此replica已经死亡会从同步副本列表从移除。检测慢副本机制发生了变化——如果一个replica开始落后leader超过replica.lag.time.max.ms。Kafka会认为太缓慢并且会从同步副本列表中移除。除非replica请求leader时间间隔大于replica.lag.time.max.ms,因此即使leader使流量激增和大批量写消息。Kafka也不会从同步副本列表从移除该副本。
8.5 一致性保证
在leader宕机后,只能从ISR列表中选取新的leader,无论ISR中哪个副本被选为新的leader,它都知道HW之前的数据,可以保证在切换了leader后,消费者可以继续看到HW之前已经提交的数据。
HW的截断机制:选出了新的leader,而新的leader并不能保证已经完全同步了之前leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的follower都要将数据截断到HW的位置,再和新的leader同步数据,来保证数据一致。 当宕机的leader恢复,发现新的leader中的数据和自己持有的数据不一致,此时宕机的leader会将自己的数据截断到宕机之前的hw位置,然后同步新leader的数据。宕机的leader活过来也像follower一样同步数据,来保证数据的一致性。
Leader Epoch引用
于HW值被用于衡量副本备份的成功与否以及在出现failture时作为日志截断的依据,但HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间发生的任何崩溃都可能导致HW值的过期。可能会导致数据丢失和数据不一致问题
鉴于这些原因,Kafka 0.11引入了leader epoch来取代HW值。Leader端多开辟一段内存区域专门保存leader的epoch信息,这样即使出现上面的两个场景也能很好地规避这些问题。
所谓leader epoch实际上是一对值:(epoch,offffset)。epoch表示leader的版本号,从0开始,当leader变更过1次时epoch就会+1,而offffset则对应于该epoch版本的leader写入第一条消息的位移。因此假设有两对值:(0, 0)(1, 120)则表示第一个leader从位移0开始写入消息;共写了120条[0, 119];而第二个leader版本号是1,从位移120处开始写入消息。leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件中。
8.6 消息重复场景及解决方案
生产者端重复
生产发送的消息没有收到正确的broke响应,导致producer重试。
producer发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后producer收到一个可恢复的Exception重试消息导致消息重复。
解决方案:
1、启动kafka的幂等性
要启动kafka的幂等性,无需修改代码,默认为关闭,需要修改配置文件:enable.idempotence=true 同时要求 ack=all 且 retries>1。
2、ack=0,不重试。
可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。
消费者端重复
1、根本原因
数据消费完没有及时提交offffset到broker。
解决方案
1、取消自动自动提交
每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。
2、下游做幂等
一般的解决方案是让下游做幂等或者尽量每消费一条消息都记录offffset,对于少数严格的场景可能需要把offffset或唯一ID,例如订单ID和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offffset,然后更新下游数据的时候用消费位点做乐观锁拒绝掉旧位点的数据更新
8.7 __consumer_offsets
_consumer_offsets是一个内部topic,对用户而言是透明的,除了它的数据文件以及偶尔在日志中出现这两点之外,用户一般是感觉不到这个topic的。不过我们的确知道它保存的是Kafka新版本consumer的位移信息。
何时创建
一般情况下,当集群中第一有消费者消费消息时会自动创建主题__consumer_offsets,分区数可以通过offsets.topic.num.partitions参数设定,默认值为50,
解析分区
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.*;
/**
* @Author dayuan
*/
public class ConsumerOffsetsAnalysis {
// Kafka集群地址
private static final String brokerList = "127.0.0.1:9092";
// 主题名称-之前已经创建
private static final String topic = "heima";
// 消费组
private static final String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("enable.auto.commit", "false");
properties.put("auto.offset.reset", "latest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);
Consumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties);
consumer.subscribe(Collections.singletonList("__consumer_offsets"));
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
Map<String, Integer> map = new HashMap<>();
while (iterator.hasNext()) {
ConsumerRecord<byte[], byte[]> record = iterator.next();
if (record.key() == null) {
continue;
}
System.out.println("topic:" + record.topic() + ",partition:" + record.partition() + ",offset:" + record.offset());
}
}
}
}8.8 命令行工具
消费组管理
# 查看消费组
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-consumer-groups.sh -- bootstrap-server localhost:9092 --list
# 查看消费组详情
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-consumer-groups.sh -- bootstrap-server localhost:9092 --describe --group group.demo
# 查看消费组当前状态
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-consumer-groups.sh -- bootstrap-server localhost:9092 --describe --group group.demo --state
# 消费组内成员信息
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-consumer-groups.sh -- bootstrap-server localhost:9092 --describe --group group.demo --members
# 删除消费组 ,如果有消费者在使用则会失败
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-consumer-groups.sh -- bootstrap-server localhost:9092 --delete --group group.demo消费位移管理
重置消费位移,前提是没有消费者在消费
# :--all-topics指定了所有主题,可以修改为--topics,指定单个主题。
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-consumer-groups.sh -- bootstrap-server localhost:9092 --group g roup.demo --all-topics --reset-offsets --to-earliest --execute8.9 数据管道Connect
Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便
Connect中一些概念
连接器:实现了Connect API,决定需要运行多少个任务,按照任务来进行数据复制,从work进程获取任务配置并将其传递下去
任务:负责将数据移入或移出Kafka
work进程:相当与connector和任务的容器,用于负责管理连接器的配置、启动连接器和连接器任务,提供REST API
转换器:kafka connect和其他存储系统直接发送或者接受数据之间转换数据
独立模式--文件系统 示例
以下示例使用到了两个Connector,将文件source.txt 中的内容通过Source连接器写入Kafka主题中,然后将内容写入srouce.sink.txt中。
- FileStreamSource:从source.txt中读取并发布到Broker中
- FileStreamSink:从Broker中读取数据并写入到source.sink.txt文件中
步骤:
Worker进程用到的配置文件${KAFKA_HOME}/confifig/connect-standalone.properties
// Kafka集群连接的地址
bootstrap.servers=localhost:9092
// 格式转化类
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
// json消息中是否包含
schema key.converter.schemas.enable=true
value.converter.schemas.enable=true
// 保存偏移量的文件路径
offset.storage.file.filename=/tmp/connect.offsets
// 设定提交偏移量的频率
offset.flush.interval.ms=10000其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties
// 配置连接器的名称
name=local-file-source
// 连接器的全限定名称,设置类名称也是可以的
connector.class=FileStreamSource
// task数量
tasks.max=1
// 数据源的文件路径
file=/tmp/source.txt
// 主题名称
topic=topic0703其中的Sink使用到的配置文件是${KAFKA_HOME}/confifig/connect-fifile-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/source.sink.txt
topics=topic0703启动
# 启动source连接器
/mnt/d/kafka_2.12-2.2.1$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
# 启动slink连接器
/mnt/d/kafka_2.12-2.2.1$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
# source写入文本信息
/mnt/d/kafka_2.12-2.2.1$ echo "Hello kafka,I coming;">>/tmp/source.txt
# 查看slink文件
/mnt/d/kafka_2.12-2.2.1$ cat /tmp/source.sink.txt信息流--ElasticSearch
Kafka connect workers有两种工作模式,单机模式和分布式模式。在开发和适合使用单机模式的场景下,可以使用standalone模式, 在实际生产环境下由于单个worker的数据压力会比较大,distributed模式对负载均和和扩展性方面会有很大帮助。(本测试使用standalone模式)
关于Kafka Connect的详细情况可以参考[Kafka Connect]
Kafka Connect 安装
Worker配置
本测试使用standalone模式,因此修改../etc/schema-registry/connect-avro-standalone.properties
bootstrap.servers=localhost:9092Elasticsearch Connector配置
修改../etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
# 其中topics不仅对应Kafka的topic名称,同时也是Elasticsearch的索引名,
# 当然也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射
topics=topic0703
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect启动
Elasticsearch
~$ curl 'http://localhost:9200/?pretty'
{
"name" : "MY-20190430BUDR",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "ha3pnLkhRuGEIgXQstYnbQ",
"version" : {
"number" : "7.2.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "508c38a",
"build_date" : "2019-06-20T15:54:18.811730Z",
"build_snapshot" : false,
"lucene_version" : "8.0.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}schema Registry
/mnt/d/confluent-5.3.0$ bin/schema-registry-start etc/schema- registry/schema-registry.properties查看服务是否正常
jps -l启动Connector
/mnt/d/confluent-5.3.0$ ./bin/connect-standalone etc/schema- registry/connect-avro-standalone.properties etc/kafka-connect- elasticsearch/quickstart-elasticsearch.properties8.10 流式处理Spark
Spark最初诞生于美国加州大学伯克利分校(UC Berkeley)的AMP实验室,是一个可应用于大规模数据处理的快速、通用引擎。
Spark最初的设计目标是使数据分析更快——不仅运行速度快,也要能快速、容易地编写程序。为了使程序运行更快,Spark提供了内存计算,减少了迭代计算时的IO开销;而为了使编写程序更为容易,Spark使用简练、优雅的Scala语言编写,基于Scala提供了交互式的编程体验
安装与应用
启动
/mnt/d/spark-2.4.3-bin-hadoop2.7$ sbin/start-all.sh starting org.apache.spark.deploy.master.Master, logging to /mnt/d/spark-2.4.3- bin-hadoop2.7/logs/spark-dayuan-org.apache.spark.deploy.master.Master-1-MY- 20190430BUDR.out验证
/mnt/d/spark-2.4.3-bin-hadoop2.7$ jps -l
# 浏览器输入 ip:8080验证Spark和Kafka整合
com.spark.SparkStreamingFromkafka
演示
# 发送消息
/mnt/d/kafka_2.12-2.2.1$ bin/kafka-console-producer.sh -- broker-list localhost:9092 --topic heima
# 接收消息8.11 Kafka优化
1、网络和io操作线程配置优化
broker处理消息的最大线程数(默认为3)
num.network.threads=cpu核数+1
broker处理磁盘IO的线程数
num.io.threads=cpu核数*2
2、log数据文件刷盘策略
每当producer写入10000条消息时,刷数据到磁盘log.flush.interval.messages=10000
每间隔1秒钟时间,刷数据到磁盘log.flush.interval.ms=1000
3、日志保留策略配置
保留三天,也可以更短 (log.cleaner.delete.retention.ms)log.retention.hours=72
段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件 log.segment.bytes=1073741824
4、Replica相关配置 default.replication.factor:3
这个参数指新创建一个topic时,默认的Replica数量,Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜。
5、调优vm.max_map_count参数。
主要适用于Kafka broker上的主题数超多的情况。Kafka日志段的索引文件是用映射文件的机制来做的,故如果有超多日志段的话,这种索引文件数必然是很多的,极易打爆这个资源限制,所以对于这种情况一般要适当调大这个参数。
6、JVM堆大小
首先鉴于目前Kafka新版本已经不支持Java7了,而Java 8本身不更新了,甚至Java9其实都不做了,直接做Java10了,所以我建议Kafka至少搭配Java8来搭建。至于堆的大小,个人认为6-10G足矣。如果出现了堆溢出,就提jira给社区,让他们看到底是怎样的问题。因为这种情况下即使用户调大heap size,也只是延缓OOM而已,不太可能从根本上解决问题。
九、安装
9.1 docker安装
#docker启动
#启动zookeeper
docker run --name zookeeper \
-v /opt/data/zksingle:/data \
-p 2181:2181 \
-e ZOO_LOG4J_PROP="INFO,ROLLINGFILE" \
-d zookeeper:3.4.13
#启动kafka
docker run -d --name kafka \
-p 9103:9092 \
--link zookeeper:zookeeper \
--env KAFKA_BROKER_ID=100 \
--env HOST_IP=39.98.133.153 \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=39.98.133.153 \
--env KAFKA_ADVERTISED_PORT=9103 \
--restart=always \
--volume /etc/localtime:/etc/localtime \
wurstmeister/kafka:2.12-2.2.2
#使⽤zk节点数据验证启动情况
docker exec -it zookeeper sh
#进⼊zookeeper后查看节点信息
ls /brokers
#进⼊容器
docker exec -it kafka sh
/opt/kafka_2.12-2.2.2/bin
#客户端监听(该步会⾃动创建topic)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
#另起⼀个终端,验证发送
./kafka-console-producer.sh --broker-list localhost:9092 --topic demo9.2 常规安装
需先安装jdk和zookerrper
启动
bin/kafka-server-start.sh confifig/server.propertiesproperties# server.properties配置 # 表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同 broker.id=0 # brokder对外提供的服务入口地址 listeners=PLAINTEXT://:9092 # 设置存放消息日志文件的地址 log.dirs=/tmp/kafka/log # Kafka所需Zookeeper集群地址,教学中Zookeeper和Kafka都安装本机 zookeeper.connect=localhost:2181验证
sh# 创建一个主题 # --zookeeper:指定了Kafka所连接的Zookeeper服务地址 # --topic:指定了所要创建主题的名称 # --partitions:指定了分区个数 # --replication-factor:指定了副本因子 # --create:创建主题的动作指令 bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1 # 展示所有主题 bin/kafka-topics.sh --zookeeper localhost:2181 --list # 查看主题详情 # --describe 查看详情动作指令 bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic heima # 消费端订阅消息 # --bootstrap-server 指定了连接Kafka集群的地址 # --topic 指定了消费端订阅的主题 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heima # 生产端发送消息 # --broker-list 指定了连接的Kafka集群的地址 # --topic 指定了发送消息时的主题 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic heima
9.3 集群安装
Kafka 是一个分布式消息系统,具有高水平扩展和高吞吐量的特点。在Kafka 集群中,没有 “中心主节点” 的概念,集群中所有的节点都是对等的。
Broker(代理)
每个 Broker 即一个 Kafka 服务实例,多个 Broker 构成一个 Kafka 集群,生产者发布的消息将保存在Broker 中,消费者将从 Broker 中拉取消息进行消费。
Kafka 强依赖于 ZooKeeper ,通过 ZooKeeper 管理自身集群,如:Broker 列表管理、Partition 与 Broker 的关系、Partition 与 Consumer 的关系、Producer 与 Consumer 负载均衡、消费进度 Offset 记录、消费者注册 等,所以为了达到高可用,ZooKeeper 自身也必须是集群。
ZooKeeper集群搭建 参考zookeeper技术
kafka集群
# broker 编号,集群内必须唯一
broker.id=1
# host 地址
host.name=127.0.0.1
# 端口
port=9092
# 消息日志存放地址
log.dirs=/tmp/kafka/log/cluster/log3
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183多集群同步
MirrorMaker是为解决Kafka跨集群同步、创建镜像集群而存在的;该工具消费源集群消息然后将数据重新推送到目标集群
使用MirrorMaker创建镜像是比较简单的,搭建好目标Kafka集群后,只需要启动mirror-maker程序即可。其中,一个或多个consumer配置文件、一个producer配置文件是必须的,whitelist、blacklist是可选的。在consumer的配置中指定源Kafka集群的Zookeeper,在producer的配置中指定目标集群的Zookeeper(或者broker.list)。
kafka-run-class.sh kafka.tools.MirrorMaker –consumer.config sourceCluster1Consumer.config –consumer.config sourceCluster2Consumer.config –num.streams 2 – producer.config targetClusterProducer.config –whitelist=“.*”consumer配置文件
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# consumer group id
group.id=test-consumer-group
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=producer配置文件:
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none同步数据如何做到不丢失 首先发送到目标集群时需要确认:request.required.acks=1 发送时采用阻塞模式,否则缓冲区满了数据丢弃:queue.enqueue.timeout.ms=-1
9.4 kafka-manager
kafka-manager是⽬前最受欢迎的kafka集群管理⼯具,最早由雅⻁开源,⽤户可以在Web界⾯执⾏⼀些简单的集群管理操作。具体⽀持以下内容:
- 管理多个集群
- 轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发)
- 运⾏⾸选副本选举
- 使⽤选项⽣成分区分配以选择要使⽤的代理
- 运⾏分区重新分配(基于⽣成的分配)
- 使⽤可选主题配置创建主题(0.8.1.1具有与0.8.2+不同的配置)
- 删除主题(仅⽀持0.8.2+并记住在代理配置中设置delete.topic.enable = true)
- 主题列表现在指示标记为删除的主题(仅⽀持0.8.2+)
- 批量⽣成多个主题的分区分配,并可选择要使⽤的代理
- 批量运⾏重新分配多个主题的分区
- 将分区添加到现有主题
- 更新现有主题的配置
#解压
unzip kafka-manager-2.0.0.2.zip
#配置⽂件,修改⽬录下的conf/application.conf
kafka-manager.zkhosts="localhost:2181"
#启动,指定端⼝9104
km_home=./kafka-manager-2.0.0.2
nohup $km_home/bin/kafka-manager -Dconfig.file=$km_home/conf/application.conf -
Dhttp.port=9104 > /opt/logs/kibana.log &
tail -f /opt/logs/kibana.log
9.5 监控系统: Kafka Eagle
Kafka Eagle 用于监控 Kafka 集群中 Topic 被消费的情况。包含 Lag 的产生,Offset 的变动,Partition 的分布,Owner ,Topic 被创建的时间和修改的时间等信息。
配置
\conf\system-config.properties
###################################### # multi zookeeper&kafka cluster list ######################################
#如果只有一个集群的话,就写一个cluster1就行了
kafka.eagle.zk.cluster.alias=cluster1
#这里填上刚才上准备工作中的zookeeper.connect地址
cluster1.zk.list=localhost:2181
#如果多个集群,继续写,如果没有注释掉
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
###################################### # zk client thread limit ######################################
kafka.zk.limit.size=25
###################################### # kafka eagle webui port ######################################
###web界面地址端口 kafka.eagle.webui.port=8048 ######################################
# kafka offset storage ######################################
cluster1.kafka.eagle.offset.storage=kafka
#cluster2.kafka.eagle.offset.storage=zk
###################################### # enable kafka metrics ######################################
kafka.eagle.metrics.charts=false
kafka.eagle.sql.fix.error=false
###################################### # kafka sql topic records max ######################################
kafka.eagle.sql.topic.records.max=5000
###################################### # alarm email configure ######################################
#kafka.eagle.mail.enable=false
#kafka.eagle.mail.sa=alert_sa@163.com
#kafka.eagle.mail.username=alert_sa@163.com
#kafka.eagle.mail.password=mqslimczkdqabbbh
#kafka.eagle.mail.server.host=smtp.163.com
#kafka.eagle.mail.server.port=25
###################################### # alarm im configure ######################################
#kafka.eagle.im.dingding.enable=true
#kafka.eagle.im.dingding.url=https://oapi.dingtalk.com/robot/send?access_token=
#kafka.eagle.im.wechat.enable=true
#kafka.eagle.im.wechat.token=https://qyapi.weixin.qq.com/cgi-bin/gettoken? corpid=xxx&corpsecret=xxx
#kafka.eagle.im.wechat.url=https://qyapi.weixin.qq.com/cgi-bin/message/send? access_token= #kafka.eagle.im.wechat.touser=
#kafka.eagle.im.wechat.toparty= #kafka.eagle.im.wechat.totag= #kafka.eagle.im.wechat.agentid=
###################################### # delete kafka topic token ######################################
kafka.eagle.topic.token=keadmin
###################################### # kafka sasl authenticate ######################################
#cluster1.kafka.eagle.sasl.enable=false
#cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
#cluster1.kafka.eagle.sasl.mechanism=PLAIN
#cluster2.kafka.eagle.sasl.enable=false
#cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT #cluster2.kafka.eagle.sasl.mechanism=PLAIN
#cluster1.kafka.eagle.sasl.client=/mnt/d/kafka-eagle-web- 1.3.6/conf/kafka_client_jaas.conf
###################################### # kafka jdbc driver address######################################
kafka.eagle.driver=org.sqlite.JDBC
#这个地址,按照安装目录进行配置
kafka.eagle.url=jdbc:sqlite:D:/kafka-eagle-web-1.3.6/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=123456环境变量:
KE_HOME D:\kafka-eagle-web-1.3.6
启动
启动命令:D:\kafka-eagle-web-1.3.6\bin\ke.bat
访问
http://localhost:8048/ke 默认用户名:admin 密码:admin
9.6 java代码实现JMX监控
监控度量指标
JMX
Kafka自身提供的监控指标(包括broker和主题的指标,集群层面的指标通过各个broker的指标累加来获取)都可以通过JMX(Java Managent Extension)来进行获取。在使用JMX之前首先要确保Kafka开启了JMX的功能(默认是关闭的)
在使用jmx之前需要确保kafka开启了jmx监控,kafka启动时要添加JMX_PORT=9999
/mnt/d/kafka_2.12-2.2.1$ JMX_PORT=9999 bin/kafka-server- start.sh config/server.properties获取监控值
在开启JMX之后最简单的监控指标的方式就是使用JConsole,可以通过jconsole连接service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi或者localhost:9999来查看相应的数据值
编程方式
javaimport javax.management.*; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import java.io.IOException; /** * JMX Connection */ public class JmxConnectionDemo { private MBeanServerConnection conn; private String jmxURL; private String ipAndPort; public JmxConnectionDemo(String ipAndPort) { this.ipAndPort = ipAndPort; } public boolean init(){ jmxURL = "service:jmx:rmi:///jndi/rmi://" + ipAndPort + "/jmxrmi"; try { JMXServiceURL serviceURL = new JMXServiceURL(jmxURL); JMXConnector connector = JMXConnectorFactory .connect(serviceURL, null); conn = connector.getMBeanServerConnection(); if (conn == null) { return false; } } catch (IOException e) { e.printStackTrace(); } return true; } public double getMsgInPerSec() { String objectName = "kafka.server:type=BrokerTopicMetrics," + "name=MessagesInPerSec"; Object val = getAttribute(objectName, "OneMinuteRate"); if (val != null) { return (double) (Double) val; } return 0.0; } private Object getAttribute(String objName, String objAttr) { ObjectName objectName; try { objectName = new ObjectName(objName); return conn.getAttribute(objectName, objAttr); } catch (MalformedObjectNameException | IOException | ReflectionException | InstanceNotFoundException | AttributeNotFoundException | MBeanException e) { e.printStackTrace(); } return null; } public static void main(String[] args) { JmxConnectionDemo jmxConnectionDemo = new JmxConnectionDemo("localhost:9999"); jmxConnectionDemo.init(); System.out.println(jmxConnectionDemo.getMsgInPerSec()); } }
broker监控指标
活跃控制器
表示 broker 是否就是当前的集群控制器,其值可以是 0 或 l。如果是 1 ,表示 broker 就是当前的控制器。任何时候,都应该只有一个 broker 是控制器,而且这个 broker 必须一直是集群控制器。如果出现了两个控制器,说明有一个本该退出的控制器线程被阻 塞了,这会导致管理任务无陆正常执行,比如移动分区。为了解决这个问题,需要将这两 个 broker 重启,而且不能通过正常的方式重启,因为此时它们无陆被正常关闭
kafka.controller:type=KafkaController,name=ActiveControllerCount值区间:0或1
请求处理器空闲率
Kafka 使用了两个线程地来处理客户端的请求:网络处理器线程池和请求处理器线程池。 网络处理器线程地负责通过网络读入和写出数据。这里没有太多的工作要做, 也就是说, 不用太过担心这些线程会出现问题。请求处理器线程地负责处理来自客户端的请求,包括 从磁盘读取消息和往磁盘写入消息。因此, broker 负载的增长对这个线程池有很大的影响。
kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent主题流入字节
主题流入字节速率使用 bis 来表示,在对 brok巳r 接收的生产者客户端悄息流量进行度量时, 这个度量指标很有用。该指标可以用于确定何时该对集群进行扩展或开展其他与规模增长 相关的工作。它也可以用于评估一个 broker 是否比集群里的其他 broker 接收了更多的流 量, 如果出现了这种情况,就需要对分区进行再均衡。
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSecRateUni.t 这是速率的时间段,在这里是“秒\ 这两个属性表明,速率是通过 bis 来表示的,不管它的值是基于多长的时间段算出的平均 值。速率还有其他 4 个不同粒度的属性。 OneMi.nuteRate 前 1 分钟的平均值。 Fi.ve问i.nuteRate 前 5 分钟的平均值。 Fi.fteenMi.nuteRate 前 15 分钟的平均值。MeanRate 从 broker 启动到目前为止的平均值。
主题流出字节
主题流出字节速率与流入字节速率类似,是另一个与规模增长有关的度量指标。流出字节速 率显示的是消费者从 broker读取消息的速率。流出速率与流入速率的伸缩方式是不一样的, 这要归功于 Kafka 对多消费者客户端的支持。
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec主题流入的消息
消息速率则以每秒生成消息个 数的方式来表示流量,而且不考虑消息的大小。这也是一个很有用的生产者流量增长规模 度量指标。它也可以与字节速率一起用于计算消息的平均大小
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec分区数量
broker 的分区数量一般不会经常发生改变,它是指分配给 broker 的分区总数。它包括 broker 的每一个分区副本,不管是首领还是跟随者
kafka.server:type=ReplicaManager,name=PartitionCount首领数量
该度量指标表示 broker 拥有的首领分区数量。与 broker 的其他度量一样,该度量指标也应 该在整个集群的 broker 上保持均等。我们需要对该指析J进行周期性地检查,井适时地发出 告警,即使在副本的数量和大小看起来都很完美的时候,它仍然能够显示出集群的不均衡 问题。因为 broker 有可能出于各种原因释放掉一个分区的首领身份,比如 Zookeeper 会话 过期,而在会话恢复之后,这个分区并不会自动拿回首领身份(除非启用了自动首领再均 衡功能)。在这些情况下,该度量指标会显示较少的首领分区数,或者直接显示为零。这 个时候需要运行一个默认的副本选举,重新均衡集群的首领
kafka.server:type=ReplicaManager,name=LeaderCount主题分区监控
主题实例的度量指标
主题实例的度量指标与之前描述的 broker 度量指标非常相似。事实上,它们之间唯一的区 别在于这里指定了主题名称,也就是说,这些度量指标属于某个指定的主题。主题实例的 度量指标数量取决于集群主题的数量,而且用户极有可能不会监控这些度量指标或设置告 警。 它们一般提供给客户端使用,客户端依此评估它们对 Kafka 的使用情况,并进行问题 调试。
分区实例的度量指标
分区实例的度量指标不如主题实例的度量指标那样有用。另外,它们的数量会更加庞大, 因为几百个主题就可能包含数千个分区。不过不管怎样,在某些情况下,它们还是有一定用处的。 Partition size 度量指标表示分区当前在磁盘上保留的数据量。如 果把它们组合在一起,就可以表示单个主题保留的数据量,作为客户端配额的依据。同一 个主题的两个不同分区之间的数据量如果存在差异,说明消息并没有按照生产消息的键 进行均句分布。 Log segment count 指标表示保存在磁盘上的日志片段的文件数量,可以与 Partition size 指标结合起来,用于跟踪资糠的使用情况。
生产者监控指标
生产者度盐指标的 MBean 名字里都包含了生产者的客户端 ID。在下面的示例里,客户端 ID使用 CLIENTID 表示, broker ID 使用 BROKERID 表示, 主题的名字使用 TOPICNAME 表示,
kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec消费者监控指标
新版本的消费者客户端将大量的度量指标属性塞进了少数 的几个MBean 里
kafka. consumer:type=consumer-metrics,client-id=CLIENTID kafka.
consumer:type=consumer-fetch-manager-metrics,client-id=CLIENTID